flinkkafka(Flink如何管理Kafka 消费位点(译文)) 您所在的位置:网站首页 kafka topic offset机制 flinkkafka(Flink如何管理Kafka 消费位点(译文))

flinkkafka(Flink如何管理Kafka 消费位点(译文))

2023-03-26 13:01| 来源: 网络整理| 查看: 265

本文目录Flink如何管理Kafka 消费位点(译文)Flink是如何从kafka中拉取数据的flink处理数据从kafka到另外一个kafkaflink消费kafka细节Flink 配置Kafka数据源Flink kafka kerberos的配置Flink实战之Kafka To Hive4.一文搞定:Flink与Kafka之间的精准一次性Flink消费Kafka如何保证相同标识消息的有序性Flink如何管理Kafka 消费位点(译文)

Checkpointing 是 Flink 故障恢复的内部机制。一个 checkpoint 就是 Flink应用程序产生的状态的一个副本。如果 Flink 任务发生故障,它会从 checkpoint 中载入之前的状态来恢复任务,就好像故障没有发生一样。

Checkpoints是 Flink 容错的基础,并且确保了 Flink 流式应用在失败时的完整性。Checkpoints 可以通过 Flink 设置定时触发。

Flink Kafka consumer使用 Flink 的 checkpoint 机制来存储 Kafka 每个分区的位点到 state。当 Flink 执行 checkpoint 时,Kafka 的每个分区的位点都被存储到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 机制确保了所有任务算子的状态是一致的,也就是说这些状态具有相同的数据输入。当所有的任务算子成功存储他们自己的状态后,代表一次 checkpoint 的完成。因此,当任务从故障中恢复时,Flink 保证了exactly-once。

下面将一步一步的演示 Flink 是如何通过 checkpoint 来管理 Kafka 的 offset 的。

下面的例子从两个分区的 Kafka topic 中读取数据,每个分区的数据是 “A”, “B”, “C”, ”D”, “E”。假设每个分区都是从 0 开始读取。

假设 Flink Kafka consumer 从分区 0 开始读取数据 “A”,那么此时第一个 consumer 的位点从 0 变成 1。如下图所示。

此时数据 “A” 到达 Flink Job 中的 Map Task。两个 consumer 继续读取数据 (从分区 0 读取数据 “B” ,从分区 1 读取数据 “A”)。 offsets 分别被更新成 2 和 1。与此同时,假设 Flink 从 source 端开始执行 checkpoint。

到这里,Flink Kafka consumer tasks 已经执行了一次快照,offsets也保存到了 state 中(“offset = 2, 1”) 。此时 source tasks 在 数据 “B” 和 “A” 后面,向下游发送一个 checkpoint barrier。checkpoint barriers 是 Flink 用来对齐每个任务算子的 checkpoint,以确保整个 checkpoint 的一致性。分区 1 的数据 “A” 到达 Flink Map Task, 与此同时分区 0 的 consumer 继续读取下一个消息(message “C”)。

Flink Map Task 收到上游两个 source 的 checkpoint barriers 然后开始执行 checkpoint ,把 state 保存到 filesystem。同时,消费者继续从Kafka分区读取更多事件。

假设 Flink Map Task 是 Flink Job 的最末端,那么当它完成 checkpoint 后,就会立马通知 Flink Job Master。当 job 的所有 task 都确认其 state 已经 “checkpointed”,Job Master将完成这次的整个 checkpoint。 之后,checkpoint 可以用于故障恢复。

如果发生故障(例如,worker 挂掉),则所有任务将重启,并且它们的状态将被重置为最近一次的 checkpoint 的状态。 如下图所示。

source 任务将分别从 offset 2 和 1 开始消费。当任务重启完成, 将会正常运行,就像之前没发生故障一样。

PS: 文中提到的 checkpoint 对齐,我说下我的理解,假设一个 Flink Job 有 Source -》 Map -》 Sink,其中Sink有多个输入。那么当一次checkpoint的 barrier从source发出时,到sink这里,多个输入需要等待其它的输入的barrier已经到达,经过对齐后,sink才会继续处理消息。这里就是exactly-once和at-least-once的区别。

The End原文链接: How Apache Flink manages Kafka consumer offsets

Flink是如何从kafka中拉取数据的

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

createFetcher方法

返回了一个 KafkaFetcher对象,我们点进去看一下KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

至此如何从kafka中拉取数据,已经介绍完了

flink处理数据从kafka到另外一个kafka

需求就是将流量数据(json格式)中某个接口数据抽取一下。如:有个identityUri=“yiyang/user/getById/13782“ , 这里的13782,是个userId,我们需要将其处理成 identityUri=“yiyang/user/getById/{}“

实际上我们生产中是将二者接口使用的。先使用2,如果没有匹配到,在使用1

这里是演示flink kafka的用法,我们简单使用正则处理

注意:kafka消费的方式是: kafkaConsumer.setStartFromGroupOffsets();

看下上面的启动日志,有这样的信息:Resetting offset for partition yiyang-0 to offset 22.

我们另外启动一个程序,发送消息,并消费两个topic中的数据

看下 ConsumeKafkaTest 中的日志

在看下另外一个服务(消费两个topic数据)的日志:

说明已经成功的把处理好的消息发送到另外一个topic中了

关于数据处理,如果只是简单的增加字段,减少字段,正则替换,也可以使用logstash工具

flink消费kafka细节

# flink消费kafka细节

Apache kafka connector提供对Kafka服务的事件流的访问。Flink提供了特殊的Kafka连接器,用于从Kafka主题读写数据。 Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。 为此,Flink不仅仅依赖于Kafka的消费者群体偏移量跟踪,还内部跟踪和检查这些偏移量。

请为您的用例和环境选择一个包(Maven项目ID)和类名。 对于大多数用户来说,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是合适的。

Maven Dependency                | Supported since | Consumer and Producer Class name            | Kafka version | Notes                                                       

:------------------------------ | :-------------- | :------------------------------------------ | :------------ | :-----------------------------------------------------------

flink-connector-kafka-0.8_2.11  | 1.0.0          | FlinkKafkaConsumer08 FlinkKafka

### kafka生产者API的使用

Flink 配置Kafka数据源

flink中已经预置了kafka相关的数据源实现FlinkKafkaConsumer010,先看下具体的实现:

kafka的Consumer有一堆实现,不过最终都是继承自FlinkKafkaConsumerBase,而这个抽象类则是继承RichParallelSourceFunction,是不是很眼熟,跟自定义mysql数据源继承的抽象类RichSourceFunction很类似。

可以看到,这里有很多构造函数,我们直接使用即可。

说明:

a、这里直接使用properties对象来设置kafka相关配置,比如brokers、zk、groupId、序列化、反序列化等。

b、使用FlinkKafkaConsumer010构造函数,指定topic、properties配置

c、SimpleStringSchema仅针对String类型数据的序列化及反序列化,如果kafka中消息的内容不是String,则会报错;看下SimpleStringSchema的定义:

d、这里直接把获取到的消息打印出来。

Flink kafka kerberos的配置

Flink消费集成kerberos认证的kafka集群时,需要做一些配置才可以正常执行。

    Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone

    //指示是否从 Kerberos ticket 缓存中读取

    security.kerberos.login.use-ticket-cache: false1

   //Kerberos 密钥表文件的绝对路径

    security.kerberos.login.keytab: /data/home/keytab/flink.keytab

   //认证主体名称

    security.kerberos.login.principal: [email protected]

    //Kerberos登陆contexts

    security.kerberos.login.contexts: Client,KafkaClient

  val properties: Properties =new Properties()

  properties.setProperty(“bootstrap.servers“,“broker:9092“)

  properties.setProperty(“group.id“,“testKafka“)

  properties.setProperty(“security.protocol“,“SASL_PLAINTEXT“)

  properties.setProperty(“sasl.mechanism“,“GSSAPI“)

  properties.setProperty(“sasl.kerberos.service.name“,“kafka“)

  consumer =new   FlinkKafkaConsumer(“flink“,new SimpleStringSchema(), properties)

    参数说明 :security.protocol 

    运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。

Flink实战之Kafka To Hive

传统的入库任务一般借助于MapReduce或者Spark来写hive表,一般都是天级别最多小时级别的任务。随着实时性要求越来越高,传统的入库不太能满足需求。Flink完全基于流式处理,同时也支持了写Hive表。本文介绍一下如果通过FlinkSQL实现kafka数据入库hive,并能够实时可查。

由于写hive表必须基于hive catalog,所以需要 注册hive catalog 。同时可以在一个job内切换catalog,如果我们不想把kafka的source table注册到hive metastore里面,那么就可以使用memory catalog。

完整SQL如下

以上sql需要借助 sql submit 来提交。

对于已有的hive表,同样也是可以写入的。但是得通过alter table table_name set tblproperties(’property_name’=’new_value’);语法将flink需要用到的属性设置进去。

4.一文搞定:Flink与Kafka之间的精准一次性

在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。本次的聊法,还是要通过以kafka(source)-》Flink,Flink(source)-》Kafka来分别展开讨论。

kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。

在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。

其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:

这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。

这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。

1.必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。2.在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:

3.配置Kafka读取数据的隔离级别在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:

4.事务超时时间这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。

截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。

Flink消费Kafka如何保证相同标识消息的有序性

在某些情况下,我们需要保证flink在消费kafka时,对于某些具有相同标识的消息,要保证其顺序性。比如说具有相同uuid的用户行为消息,要保证其消息的顺序性,这样才能有效分析其用户行为。问题:kafka只能保证同一个partition内的消息是顺序性的,但是整个topic下并不能保证是顺序的,那么该如何解决呢?

《1》 在生产消息时,就将具有相同uuid的消息分配到同一个分区中。扩展:kafka topic消息分配partition规则源码:

通过源码,分区器就会根据消息里面的分区参数key值将消息分到对应的partition。1)如果没有指定key值并且可用分区个数大于0时,在就可用分区中做轮询决定改消息分配到哪个partition2)如果没有指定key值并且没有可用分区时,在所有分区中轮询决定改消息分配到哪个partition3)如果指定key值,对key做hash分配到指定的pa



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有